package com.tdy.cdc.app.dwd;

import com.tdy.cdc.app.BaseSqlApp;
import com.tdy.cdc.common.Constant;
import com.tdy.cdc.util.FlinkSinkUtil;
import com.tdy.cdc.util.FlinkSourceUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

/**
 * @author NanHuang
 * @Date 2023/3/14
 */
public class App11_DwdMhsIptAdmissionnote extends BaseSqlApp {
    public static void main(String[] args) {
        new App11_DwdMhsIptAdmissionnote().init(
                5011,
                "App11_DwdMhsIptAdmissionnote",
                2
        );
    }

    @Override
    protected void invoke(StreamExecutionEnvironment env, StreamTableEnvironment tableEnv) {
        // 1 创建源表
        createSourceTables(tableEnv);
        // 2 join
        joinTables(tableEnv);
        // 3 写入Kafka
        writeToKafka(tableEnv);

    }

    private void writeToKafka(StreamTableEnvironment tableEnv) {
        tableEnv.executeSql("create table  target(" +
                "RECORD_ID                            String,"+
                "HOSPITAL_NAME                        String,"+
                "HOSPITAL_CODE                        String,"+
                "AGE_UNIT                             String,"+
                "AGE                                  String,"+
                "SEX_NAME                             String,"+
                "SEX_CODE                             String,"+
                "ID_CARD_NUMBER                       String,"+
                "PATIENT_VISIT_CATEGORY_CODE          String,"+
                "PATIENT_VISIT_CATEGORY               String,"+
                "INPATIENT_VISIT_FLOW_ID              String,"+
                "INPATIENT_NO                         String,"+
                "PATIENT_ID                           String,"+
                "PATIENT_NAME                         String,"+
                "DOMAIN_ID                            String,"+
                "VISIT_TIMES                          String,"+
                "CURRENT_DOOR_NUMBER                  String,"+
                "CURRENT_VILLAGE                      String,"+
                "CURRENT_TOWNSHIP                     String,"+
                "CURRENT_COUNTY                       String,"+
                "CURRENT_CITY                         String,"+
                "CURRENT_PROVINCE                     String,"+
                "MARITAL_CODE                         String,"+
                "MARITAL_NAME                         String,"+
                "ETHNIC_CODE                          String,"+
                "ETHNIC                               String,"+
                "OCCUPATION_CODE                      String,"+
                "OCCUPATION                           String,"+
                "DOC_CREATE_TIME                      String,"+
                "DOC_SUBMIT_TIME                      String,"+
                "AUTHOR_CODE                          String,"+
                "AUTHOR_NAME                          String,"+
                "REPRESENTOR_RELATION_CODE            String,"+
                "PRESENTER_ID_NUM                     String,"+
                "RELATIONSHIP_NAME                    String,"+
                "REPRESENTOR_RELIABLE_FLAG            String,"+
                "CASE_REPRESENTOR_NAME                String,"+
                "CUSTODIAN_NAME                       String,"+
                "CUSTODIAN_CODE                       String,"+
                "FINAL_AUDITOR_SIGNDATE               String,"+
                "DOC_FINAL_AUDITORCODE                String,"+
                "DOC_FINAL_AUDITOR                    String,"+
                "ATTEND_SIGNDATE                      String,"+
                "RECEPTION_DOCTOR_ID                  String,"+
                "RECEPTION_DOCTOR_SIGN_NAME           String,"+
                "RESIDENT_SIGNDATE                    String,"+
                "RESIDENT_DOCTOR_ID                   String,"+
                "RESIDENT_DOCTOR_SIGN_NAME            String,"+
                "ATTENDING_SIGNDATE                   String,"+
                "ATTENDING_DOCTOR_ID                  String,"+
                "ATTENDING_DOCTOR_SIGN_NAME           String,"+
                "ADMIT_DATETIME                       String,"+
                "BED_NO                               String,"+
                "BED_NAME                             String,"+
                "ROOM_NO                              String,"+
                "ROOM_NAME                            String,"+
                "DEPT_ID                              String,"+
                "DEPT_NAME                            String,"+
                "WARD_ID                              String,"+
                "WARD_NAME                            String,"+
                "CHIEF_COMPLAINT                      String,"+
                "PRESENT_ILLNESS_HISTORY              String,"+
                "GENERAL_HEALTH_FLAG                  String,"+
                "DISEASES_HISTORY                     String,"+
                "INFECTIOUS_DISEASE_HISTORY           String,"+
                "OBSTETRICAL_HISTORY                  String,"+
                "ALLERGIC_HISTORY                     String,"+
                "OPERATION_HISTORY                    String,"+
                "VACCINATION_HISTORY                  String,"+
                "TRANSFUSION_HISTORY                  String,"+
                "PERSONAL_HISTORY                     String,"+
                "BIRTH_HISTORY                        String,"+
                "MENSTRUAL_HISTORY                    String,"+
                "FAMILY_HISTORY                       String,"+
                "TEMPERATURE_VOLUME                   String,"+
                "PULSE_RATE                           String,"+
                "RESPIRATORY_RATE                     String,"+
                "SYSTOLIC_PRESSURE                    String,"+
                "DIASTOLIC_PRESSURE                   String,"+
                "PATIENT_HIGHT                        String,"+
                "PATIENT_WEIGHT                       String,"+
                "LYMPHONODUS_EXAM_RESULT              String,"+
                "ORGANS_EXAM_RESULT                   String,"+
                "NECK_EXAM_RESULT                     String,"+
                "PE_CHEST_EXAM                        String,"+
                "BELLY_EXAM_RESULT                    String,"+
                "DIGTAL_EXAM_RESULT                   String,"+
                "GENITAL_CHECK_RESULT                 String,"+
                "SPINE_EXAM_RESULT                    String,"+
                "ARMS_EXAM_RESULT                     String,"+
                "NERVOUS_SYSTEM_RESULT                String,"+
                "SPECIALIST_CASE                      String,"+
                "ASSIST_EXAM_TEST                     String,"+
                "CHINA_MED_OBSERVE_RESULT             String,"+
                "THERAPEUTIC_PRINCIPLE                String,"+
                "VISIT_DATE                           String,"+
                "PREL_DIAG_DATE                       String,"+
                "PREL_DIAG_ICD_CODE                   String,"+
                "PREL_DIAG_ICD_NAME                   String,"+
                "DIAG_ORDER                           String,"+
                "PREL_CHINA_DIAG_DATE                 String,"+
                "PREL_CHINA_DISEASE_CODE              String,"+
                "PREL_CHINA_DISEASE_NAME              String,"+
                "DIAG_ORDER_CZY                       String,"+
                "CORRECT_DIAG_DATE                    String,"+
                "CORRECT_DIAG_WESTERN_NAME            String,"+
                "CORRECT_DIAG_WESTERN_CODE            String,"+
                "DIAG_ORDER_XXY                       String,"+
                "CORRECT_CHINA_DIAG_DATE              String,"+
                "CORRECT_DIAG_BMMEDICINE_CODE         String,"+
                "CORRECT_DIAG_BMMEDICINE_NAME         String,"+
                "DIAG_ORDER_XZY                       String,"+
                "SUPPLEMENT_DIAG_DATE                 String,"+
                "SUPPLEMENT_DIAG_WESTERN_NAME         String,"+
                "SUPPLEMENT_DIAG_WESTERN_CODE         String,"+
                "DIAG_ORDER_BXY                       String," +
                "VISIT_ID                             String,"+
                "primary key (RECORD_ID) not enforced" +
                ")"+
                FlinkSinkUtil.getUpsertKafkaWith(Constant.TOPIC_DWD_MHS_IPT_ADMISSIONNOTE));
        // 导入数据
        tableEnv.executeSql("insert into target select * from join_result");
    }

    private void joinTables(StreamTableEnvironment tableEnv) {
        tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(30 * 60));
        Table joinResult = tableEnv.sqlQuery("select " +
                "s.unique_id                                                          AS RECORD_ID,"+
                "hospital_name                                                        AS HOSPITAL_NAME,"+
                "hospital_code                                                        AS HOSPITAL_CODE,"+
                "age_unit                                                             AS AGE_UNIT,"+
                "age                                                                  AS AGE,"+
                "sex                                                                  AS SEX_NAME,"+
                "sex_code                                                             AS SEX_CODE,"+
                "identify_no                                                          AS ID_CARD_NUMBER,"+
                "patient_typecode                                                     AS PATIENT_VISIT_CATEGORY_CODE,"+
                "patient_type                                                         AS PATIENT_VISIT_CATEGORY,"+
                "visit_id                                                             AS INPATIENT_VISIT_FLOW_ID,"+
                "inpatient                                                            AS INPATIENT_NO,"+
                "patient_id                                                           AS PATIENT_ID,"+
                "patient_name                                                         AS PATIENT_NAME,"+
                "s.domain_id                                                          AS DOMAIN_ID,"+
                "visit_times                                                          AS VISIT_TIMES,"+
                "current_door_number                                                  AS CURRENT_DOOR_NUMBER,"+
                "current_village                                                      AS CURRENT_VILLAGE,"+
                "current_township                                                     AS CURRENT_TOWNSHIP,"+
                "current_county                                                       AS CURRENT_COUNTY,"+
                "current_city                                                         AS CURRENT_CITY,"+
                "current_province                                                     AS CURRENT_PROVINCE,"+
                "marital_code                                                         AS MARITAL_CODE,"+
                "marital_name                                                         AS MARITAL_NAME,"+
                "ethnic_code                                                          AS ETHNIC_CODE,"+
                "ethnic                                                               AS ETHNIC,"+
                "occupation_code                                                      AS OCCUPATION_CODE,"+
                "occupation                                                           AS OCCUPATION,"+
                "write_time                                                           AS DOC_CREATE_TIME,"+
                "effective_time                                                       AS DOC_SUBMIT_TIME,"+
                "author_code                                                          AS AUTHOR_CODE,"+
                "author_name                                                          AS AUTHOR_NAME,"+
                "relationship_code                                                    AS REPRESENTOR_RELATION_CODE,"+
                "presenter_id_num                                                     AS PRESENTER_ID_NUM,"+
                "relationship_name                                                    AS RELATIONSHIP_NAME,"+
                "statement_reliability_mark                                           AS REPRESENTOR_RELIABLE_FLAG,"+
                "history_narrator                                                     AS CASE_REPRESENTOR_NAME,"+
                "custodian_code                                                       AS CUSTODIAN_NAME,"+
                "custodian_name                                                       AS CUSTODIAN_CODE,"+
                "final_auditor_signdate                                               AS FINAL_AUDITOR_SIGNDATE,"+
                "doc_final_auditorcode                                                AS DOC_FINAL_AUDITORCODE,"+
                "doc_final_auditor                                                    AS DOC_FINAL_AUDITOR,"+
                "attend_signdate                                                      AS ATTEND_SIGNDATE,"+
                "s.admission_code                                                     AS RECEPTION_DOCTOR_ID,"+
                "s.admission_name                                                     AS RECEPTION_DOCTOR_SIGN_NAME,"+
                "resident_signdate                                                    AS RESIDENT_SIGNDATE,"+
                "resident_code                                                        AS RESIDENT_DOCTOR_ID,"+
                "resident_name                                                        AS RESIDENT_DOCTOR_SIGN_NAME,"+
                "attending_signdate                                                   AS ATTENDING_SIGNDATE,"+
                "attending_code                                                       AS ATTENDING_DOCTOR_ID,"+
                "attending_name                                                       AS ATTENDING_DOCTOR_SIGN_NAME,"+
                "admission_date                                                       AS ADMIT_DATETIME,"+
                "bed_no                                                               AS BED_NO,"+
                "bed_name                                                             AS BED_NAME,"+
                "ward_id                                                              AS ROOM_NO,"+
                "ward_name                                                            AS ROOM_NAME,"+
                "dept_code                                                            AS DEPT_ID,"+
                "dept_name                                                            AS DEPT_NAME,"+
                "wards_id                                                             AS WARD_ID,"+
                "wards_name                                                           AS WARD_NAME,"+
                "chief_complaint                                                      AS CHIEF_COMPLAINT,"+
                "current_history                                                      AS PRESENT_ILLNESS_HISTORY,"+
                "general_health_mark                                                  AS GENERAL_HEALTH_FLAG,"+
                "disease_history                                                      AS DISEASES_HISTORY,"+
                "infectious_mark                                                      AS INFECTIOUS_DISEASE_HISTORY,"+
                "infectious_disease_history                                           AS OBSTETRICAL_HISTORY,"+
                "marriage_history                                                     AS ALLERGIC_HISTORY,"+
                "allergy_history                                                      AS OPERATION_HISTORY,"+
                "surgery_history                                                      AS VACCINATION_HISTORY,"+
                "vaccination_history                                                  AS TRANSFUSION_HISTORY,"+
                "blood_history                                                        AS PERSONAL_HISTORY,"+
                "personal_history                                                     AS BIRTH_HISTORY,"+
                "menstrual_history                                                    AS MENSTRUAL_HISTORY,"+
                "family_history                                                       AS FAMILY_HISTORY,"+
                "temperature                                                          AS TEMPERATURE_VOLUME,"+
                "pulse_rate                                                           AS PULSE_RATE,"+
                "breathe                                                              AS RESPIRATORY_RATE,"+
                "systolic                                                             AS SYSTOLIC_PRESSURE,"+
                "diastolic                                                            AS DIASTOLIC_PRESSURE,"+
                "length                                                               AS PATIENT_HIGHT,"+
                "weight                                                               AS PATIENT_WEIGHT,"+
                "pe_all_exam                                                          AS LYMPHONODUS_EXAM_RESULT,"+
                "pe_ho_exam                                                           AS ORGANS_EXAM_RESULT,"+
                "pe_neck_exam                                                         AS NECK_EXAM_RESULT,"+
                "pe_chest_exam                                                        AS PE_CHEST_EXAM,"+
                "celiac_exam                                                          AS BELLY_EXAM_RESULT,"+
                "fingerprint_exam                                                     AS DIGTAL_EXAM_RESULT,"+
                "genitals_result                                                      AS GENITAL_CHECK_RESULT,"+
                "spinal_exam                                                          AS SPINE_EXAM_RESULT,"+
                "limb_exam                                                            AS ARMS_EXAM_RESULT,"+
                "pe_nervous_exame                                                     AS NERVOUS_SYSTEM_RESULT,"+
                "specialist_situation                                                 AS SPECIALIST_CASE,"+
                "item_result                                                          AS ASSIST_EXAM_TEST,"+
                "tcm_four_diagnosis                                                   AS CHINA_MED_OBSERVE_RESULT,"+
                "principle_and_method                                                 AS THERAPEUTIC_PRINCIPLE,"+
                "visit_date                                                           AS VISIT_DATE,"+
                "case when diag_type='初步诊断-西医诊断' then diag_date else '' end     AS PREL_DIAG_DATE,"+
                "case when diag_type='初步诊断-西医诊断' then diag_code else '' end     AS PREL_DIAG_ICD_CODE,"+
                "case when diag_type='初步诊断-西医诊断' then diag_name else '' end     AS PREL_DIAG_ICD_NAME,"+
                "case when diag_type='初步诊断-西医诊断' then diag_order else '' end    AS DIAG_ORDER,"+
                "case when diag_type='初步诊断-中医诊断' then diag_date else '' end     AS PREL_CHINA_DIAG_DATE,"+
                "case when diag_type='初步诊断-中医诊断' then diag_code else '' end     AS PREL_CHINA_DISEASE_CODE,"+
                "case when diag_type='初步诊断-中医诊断' then diag_name else '' end     AS PREL_CHINA_DISEASE_NAME,"+
                "case when diag_type='初步诊断-中医诊断' then diag_order else '' end    AS DIAG_ORDER_CZY,"+
                "case when diag_type='修正诊断-西医诊断' then diag_date else '' end     AS CORRECT_DIAG_DATE,"+
                "case when diag_type='修正诊断-西医诊断' then diag_code else '' end     AS CORRECT_DIAG_WESTERN_NAME,"+
                "case when diag_type='修正诊断-西医诊断' then diag_name else '' end     AS CORRECT_DIAG_WESTERN_CODE,"+
                "case when diag_type='修正诊断-西医诊断' then diag_order else '' end    AS DIAG_ORDER_XXY,"+
                "case when diag_type='修正诊断-中医诊断' then diag_date else '' end     AS CORRECT_CHINA_DIAG_DATE,"+
                "case when diag_type='修正诊断-中医诊断' then diag_code else '' end     AS CORRECT_DIAG_BMMEDICINE_CODE,"+
                "case when diag_type='修正诊断-中医诊断' then diag_name else '' end     AS CORRECT_DIAG_BMMEDICINE_NAME,"+
                "case when diag_type='修正诊断-中医诊断' then diag_order else '' end    AS DIAG_ORDER_XZY,"+
                "case when diag_type='补充诊断-西医诊断' then diag_date else '' end     AS SUPPLEMENT_DIAG_DATE,"+
                "case when diag_type='补充诊断-西医诊断' then diag_code else '' end     AS SUPPLEMENT_DIAG_WESTERN_NAME,"+
                "case when diag_type='补充诊断-西医诊断' then diag_name else '' end     AS SUPPLEMENT_DIAG_WESTERN_CODE,"+
                "case when diag_type='补充诊断-西医诊断' then diag_order else '' end    AS DIAG_ORDER_BXY," +
                "visit_id AS VISIT_ID "+
                "from hdsd00_13_02 s " +
                "left join tdy_list_diag a on a.unique_id=s.unique_id and a.xds_type=s.xds_type");
        tableEnv.createTemporaryView("join_result",joinResult);
    }

    private void createSourceTables(StreamTableEnvironment tableEnv) {
        tableEnv.executeSql("create table hdsd00_13_02 (" +
                "pk                                         String,"+
                "upload_time                                String,"+
                "status                                     String,"+
                "empi                                       String,"+
                "encounter_id                               String,"+
                "patient_id                                 String,"+
                "patient_domain                             String,"+
                "visit_domain                               String,"+
                "visit_id                                   String,"+
                "visit_times                                String,"+
                "unique_id                                  String,"+
                "xds_type                                   String,"+
                "xds_name                                   String,"+
                "effective_time                             String,"+
                "xds_version                                String,"+
                "domain_id                                  String,"+
                "patient_type                               String,"+
                "patient_typecode                           String,"+
                "inpatient                                  String,"+
                "current_door_number                        String,"+
                "current_village                            String,"+
                "current_township                           String,"+
                "current_county                             String,"+
                "current_city                               String,"+
                "current_province                           String,"+
                "identify_no                                String,"+
                "patient_name                               String,"+
                "sex_code                                   String,"+
                "sex                                        String,"+
                "marital_code                               String,"+
                "marital_name                               String,"+
                "ethnic_code                                String,"+
                "ethnic                                     String,"+
                "age                                        String,"+
                "age_unit                                   String,"+
                "occupation_code                            String,"+
                "occupation                                 String,"+
                "write_time                                 String,"+
                "author_code                                String,"+
                "author_name                                String,"+
                "relationship_code                          String,"+
                "presenter_id_num                           String,"+
                "relationship_name                          String,"+
                "history_narrator                           String,"+
                "custodian_name                             String,"+
                "custodian_code                             String,"+
                "final_auditor_signdate                     String,"+
                "doc_final_auditorcode                      String,"+
                "doc_final_auditor                          String,"+
                "attend_signdate                            String,"+
                "admission_code                             String,"+
                "admission_name                             String,"+
                "resident_signdate                          String,"+
                "resident_code                              String,"+
                "resident_name                              String,"+
                "attending_signdate                         String,"+
                "attending_code                             String,"+
                "attending_name                             String,"+
                "admission_date                             String,"+
                "bed_no                                     String,"+
                "bed_name                                   String,"+
                "ward_id                                    String,"+
                "ward_name                                  String,"+
                "dept_code                                  String,"+
                "dept_name                                  String,"+
                "wards_id                                   String,"+
                "wards_name                                 String,"+
                "hospital_code                              String,"+
                "hospital_name                              String,"+
                "chief_complaint                            String,"+
                "current_history                            String,"+
                "general_health_mark                        String,"+
                "disease_history                            String,"+
                "infectious_mark                            String,"+
                "infectious_disease_history                 String,"+
                "marriage_history                           String,"+
                "allergy_history                            String,"+
                "surgery_history                            String,"+
                "vaccination_history                        String,"+
                "blood_history                              String,"+
                "personal_history                           String,"+
                "menstrual_history                          String,"+
                "family_history                             String,"+
                "temperature                                String,"+
                "pulse_rate                                 String,"+
                "breathe                                    String,"+
                "systolic                                   String,"+
                "diastolic                                  String,"+
                "length                                     String,"+
                "weight                                     String,"+
                "general_exam                               String,"+
                "skin_mucosal_exam                          String,"+
                "pe_all_exam                                String,"+
                "pe_ho_exam                                 String,"+
                "pe_neck_exam                               String,"+
                "pe_chest_exam                              String,"+
                "celiac_exam                                String,"+
                "fingerprint_exam                           String,"+
                "genitals_result                            String,"+
                "spinal_exam                                String,"+
                "limb_exam                                  String,"+
                "pe_nervous_exame                           String,"+
                "specialist_situation                       String,"+
                "item_result                                String,"+
                "statement_reliability_mark                 String,"+
                "tcm_four_diagnosis                         String,"+
                "principle_and_method                       String,"+
                "visit_date                                 String "+
                ")"+
                FlinkSourceUtil.getKafkaWith("hdsd00_13_02","App_34_DwdMhsIptAdmissionnote"));

        super.readTdyListDiag(tableEnv,"App_34_DwdMhsIptAdmissionnote");


    }
}
